/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/*******************************************************************************
 * $Id$
 *
 *******************************************************************************/

/*
 * we_splclient.h
 *
 *  Created on: Oct 20, 2011
 *      Author: bpaul
 */

#pragma once

#include "resourcemanager.h"

#include "we_messages.h"
#include "calpontsystemcatalog.h"

namespace WriteEngine
{
class WESplClient;  // forward decleration

// Structure for holding the Out of Range data from the BRMReport
// This class is also used by we_sdhandler to hold the agregate info.
class WEColOORInfo  // Column Out-Of-Range Info
{
 public:
  WEColOORInfo() : fColNum(0), fColType(execplan::CalpontSystemCatalog::INT), fNoOfOORs(0)
  {
  }
  ~WEColOORInfo()
  {
  }

 public:
  int fColNum;
  execplan::CalpontSystemCatalog::ColDataType fColType;
  std::string fColName;
  int fNoOfOORs;
};
typedef std::vector<WEColOORInfo> WEColOorVec;

//------------------------------------------------------------------------------
class WESdHandlerException : public std::exception
{
 public:
  std::string fWhat;
  WESdHandlerException(std::string& What) throw()
  {
    fWhat = What;
  }
  virtual ~WESdHandlerException() throw()
  {
  }
  virtual const char* what() const throw()
  {
    return fWhat.c_str();
  }
};
//------------------------------------------------------------------------------

//------------------------------------------------------------------------------

class WESplClientRunner
{
 public:
  WESplClientRunner(WESplClient& Sc) : fOwner(Sc)
  { /* ctor */
  }
  virtual ~WESplClientRunner()
  { /* dtor */
  }
  void operator()();

 public:
  WESplClient& fOwner;
};

//------------------------------------------------------------------------------

class WESplClient
{
 public:
  WESplClient(WESDHandler& Sdh, int PmId);
  virtual ~WESplClient();

  void setup();
  void startClientThread();
  void sendAndRecv();
  void send();
  void recv();
  void write(const messageqcpp::ByteStream& Msg);
  void read(messageqcpp::SBS& Sbs);
  void add2SendQueue(const messageqcpp::SBS& Sbs);
  void clearSendQueue();
  int getSendQSize();

  void printStats();
  void onConnect();
  void onDisconnect();

  unsigned int getRowTx() const
  {
    return fRowTx;
  }
  messageqcpp::BSSizeType getBytesRcv() const
  {
    return fBytesRcv;
  }
  messageqcpp::BSSizeType getBytesTx()
  {
    boost::mutex::scoped_lock aLock(fTxMutex);
    return fBytesTx;
  }
  boost::thread* getFpThread() const
  {
    return fpThread;
  }
  time_t getLastInTime()
  {
    boost::mutex::scoped_lock aLock(fLastInMutex);
    return (fLastInTime > 0) ? fLastInTime : fStartTime;  // BUG 4309
  }
  time_t getStartTime() const
  {
    return fStartTime;
  }
  time_t getElapsedTime()
  {
    return (getLastInTime() - getStartTime());
  }
  bool isCpiStarted() const
  {
    return fCpiStarted;
  }
  bool isCpiPassed() const
  {
    return fCpiPassed;
  }
  bool isCpiFailed() const
  {
    return fCpiFailed;
  }
  bool isBrmRptRcvd() const
  {
    return fBrmRptRcvd;
  }
  int getRollbackRslt() const
  {
    return fRollbackRslt;
  }
  int getCleanupRslt() const
  {
    return fCleanupRslt;
  }
  bool getSendFlag() const
  {
    return fSend;
  }
  unsigned int getPmId() const
  {
    return fPmId;
  }
  unsigned int getDbRootCnt() const
  {
    return fDbrCnt;
  }
  unsigned int getDbRootVar()
  {
    boost::mutex::scoped_lock aLock(fDataRqstMutex);
    return fDbrVar;
  }
  int getDataRqstCount()
  {
    boost::mutex::scoped_lock aLock(fDataRqstMutex);
    return fDataRqstCnt;
  }
  long getRdSecTo() const
  {
    return fRdSecTo;
  }
  bool isConnected() const
  {
    return fConnected;
  }
  bool isContinue() const
  {
    return fContinue;
  }
  const std::string& getServer() const
  {
    return fServer;
  }
  const std::string& getIpAddress() const
  {
    return fIpAddress;
  }
  void setBytesRcv(messageqcpp::BSSizeType BytesRcv)
  {
    fBytesRcv = BytesRcv;
  }
  void setBytesTx(messageqcpp::BSSizeType BytesTx)
  {
    boost::mutex::scoped_lock aLock(fTxMutex);
    fBytesTx = BytesTx;
    aLock.unlock();
  }
  void updateBytesTx(messageqcpp::BSSizeType fBytes)
  {
    boost::mutex::scoped_lock aLock(fTxMutex);
    fBytesTx += fBytes;
    aLock.unlock();
  }
  void setConnected(bool Connected)
  {
    fConnected = Connected;
  }
  void setContinue(bool Continue)
  {
    fContinue = Continue;
  }
  void setFpThread(boost::thread* pThread)
  {
    fpThread = pThread;
  }
  void setLastInTime(time_t LastInTime)
  {
    fLastInTime = LastInTime;
  }
  void setStartTime(time_t StartTime)
  {
    boost::mutex::scoped_lock aLock(fLastInMutex);
    fStartTime = StartTime;
    aLock.lock();
  }
  void setSendFlag(bool Send)
  {
    fSend = Send;
  }
  void setCpiStarted(bool Start)
  {
    fCpiStarted = Start;
  }
  void setCpiPassed(bool Pass)
  {
    setLastInTime(time(0));
    fCpiPassed = Pass;
  }
  void setCpiFailed(bool Fail)
  {
    setLastInTime(time(0));
    fCpiFailed = Fail;
    fRowsUploadInfo.fRowsRead = 0;
    fRowsUploadInfo.fRowsInserted = 0;
  }
  void setBrmRptRcvd(bool Rcvd)
  {
    fBrmRptRcvd = Rcvd;
  }
  void setRollbackRslt(int Rslt)
  {
    fRollbackRslt = Rslt;
  }
  void setCleanupRslt(int Rslt)
  {
    fCleanupRslt = Rslt;
  }
  void setPmId(unsigned int PmId)
  {
    fPmId = PmId;
  }
  void setDbRootCnt(unsigned int DbrCnt)
  {
    fDbrCnt = DbrCnt;
  }
  void resetDbRootVar()
  {
    boost::mutex::scoped_lock aLock(fDataRqstMutex);
    fDbrVar = fDbrCnt;
    aLock.unlock();
  }
  void decDbRootVar()
  {
    boost::mutex::scoped_lock aLock(fDataRqstMutex);

    if (fDbrVar > 0)
      --fDbrVar;

    aLock.unlock();
  }
  void setRdSecTo(long RdSecTo)
  {
    fRdSecTo = RdSecTo;
  }
  void setDataRqstCount(int DataRqstCnt)
  {
    boost::mutex::scoped_lock aLock(fDataRqstMutex);
    fDataRqstCnt = DataRqstCnt;
    aLock.unlock();
  }
  void decDataRqstCount()
  {
    boost::mutex::scoped_lock aLock(fDataRqstMutex);

    if (fDataRqstCnt > 0)
      --fDataRqstCnt;

    aLock.unlock();
  }
  void incDataRqstCount()
  {
    boost::mutex::scoped_lock aLock(fDataRqstMutex);
    ++fDataRqstCnt;
    aLock.unlock();
  }
  void setServer(const std::string& Server)
  {
    fServer = Server;
  }
  void setIpAddress(const std::string& IpAddr)
  {
    fIpAddress = IpAddr;
  }
  void updateRowTx(unsigned int aCnt)
  {
    fRowTx += aCnt;
  }
  void resetRowTx()
  {
    fRowTx = 0;
  }

 private:
  bool fContinue;
  bool fConnected;
  unsigned int fPmId;
  unsigned int fDbrCnt;
  unsigned int fDbrVar;  // Var to keep track next PM to send.
  int fDataRqstCnt;      // Data request count
  long fRdSecTo;         // read timeout sec
  unsigned int fRowTx;   // No. Of Rows Transmitted
  messageqcpp::BSSizeType fBytesTx;
  messageqcpp::BSSizeType fBytesRcv;
  time_t fLastInTime;
  time_t fStartTime;
  bool fSend;
  bool fCpiStarted;
  bool fCpiPassed;
  bool fCpiFailed;
  bool fBrmRptRcvd;
  int fRollbackRslt;
  int fCleanupRslt;

  boost::mutex fTxMutex;  // mutex for TxBytes
  boost::mutex fDataRqstMutex;
  boost::mutex fWriteMutex;
  boost::mutex fSentQMutex;
  boost::mutex fLastInMutex;
  typedef std::queue<messageqcpp::SBS> WESendQueue;
  WESendQueue fSendQueue;

  std::string fServer;
  std::string fIpAddress;
  boost::shared_ptr<messageqcpp::MessageQueueClient> fClnt;
  boost::thread* fpThread;
  WESDHandler& fOwner;

  class WERowsUploadInfo
  {
   public:
    WERowsUploadInfo() : fRowsRead(0), fRowsInserted(0)
    {
    }
    ~WERowsUploadInfo()
    {
    }

   public:
    int64_t fRowsRead;
    int64_t fRowsInserted;
  };
  WERowsUploadInfo fRowsUploadInfo;
  WEColOorVec fColOorVec;
  std::string fBadDataFile;
  std::string fErrInfoFile;

  void setRowsUploadInfo(int64_t RowsRead, int64_t RowsInserted);
  void add2ColOutOfRangeInfo(int ColNum, execplan::CalpontSystemCatalog::ColDataType ColType,
                             std::string& ColName, int NoOfOors);
  void setBadDataFile(const std::string& BadDataFile);
  void setErrInfoFile(const std::string& ErrInfoFile);

  friend class WESDHandler;
};
//------------------------------------------------------------------------------

} /* namespace WriteEngine */
